
#include "config.h"

#include <dirent.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBLSV

#include "fileinfo.h"
#include "md_map.h"
#include "net_global.h"
#include "stor_rpc.h"
#include "vnode.h"
#include "volume_ctl.h"
#include "volume_proto.h"
#include "yid.h"
#include "ypage.h"
#include "schedule.h"

#include "lba_lock.h"
#include "lsv_bitmap.h"
#include "lsv_volume.h"
#include "lsv_bitmap_internal.h"
#include "lsv_lib.h"
#include "lsv_rcache.h"
#include "lsv_volume_proto.h"
#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"
#include "row2_bitmap.h"
#include "row3_volume_proto_io.h"

#include <buffer.h>
#include <stack.h>
#include "types.h"

#define ALLOCATE_TEST   1

typedef struct
{
        uint32_t chunk_id;
}row_chunk_tlb_t;

typedef struct 
{
        row_chunk_tlb_t tlbs[0];
}row_test_volume_t;

static inline int row_test_volume_tlb_count(uint64_t vol_size)
{
        if(vol_size % CHUNK_SIZE)
                vol_size += CHUNK_SIZE;

        return vol_size / CHUNK_SIZE;
}

typedef struct {
        void *buffer;
        uint64_t offset;
        uint32_t length;
} row_io_t;

static inline void row_lba_split(uint64_t offset, uint32_t length, void *buffer, row_io_t *ios, uint32_t *segs)
{
        int i, split;
        row_io_t *io;
        uint64_t _offset;
        uint32_t left;

        left = length;
        _offset = offset;
        split = 0;
        for (i = 0; left > 0; i++) {
                io = &ios[i];

                io->offset = _offset;
                io->length = min_t(uint32_t, left, CHUNK_SIZE - _offset % CHUNK_SIZE);
                io->buffer = (uint8_t *)buffer + (_offset - offset);

                left -= io->length;
                _offset += io->length;

                split++;
        }

        // assert(split > 0);

        *segs = split;
}

int row_test_volume_save_header(lsv_volume_proto_t *lsv_info)
{
        return lsv_bitmap_write_chunk(lsv_info, lsv_info->u.bitmap_chunk_id, 0, CHUNK_SIZE, lsv_info->test_volume_context);
}
int row_test_volume_load_header(lsv_volume_proto_t *lsv_info)
{
        return lsv_bitmap_read_chunk(lsv_info, 0, lsv_info->u.bitmap_chunk_id, 0, CHUNK_SIZE, lsv_info->test_volume_context);
}

int row_test_volume_init(volume_proto_t *volume_proto, int create_new)
{
        int ret = 0;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info; 
        uint32_t head_size = CHUNK_SIZE;//sizeof(row_test_volume_t) + row_test_volume_tlb_count(lsv_vol->size) * sizeof(row_chunk_tlb_t);
        row_test_volume_t *test_vol = lsv_info->test_volume_context = xmalloc(head_size);

        if(create_new) {
                memset(test_vol, 0, head_size);
        
                ret = lsv_bitmap_alloc_chunk(lsv_info, &lsv_info->u.bitmap_chunk_id, 1);
                if(ret)
                        return ret;

                ret = row_test_volume_save_header(lsv_info);
        }
        else {
                ret = row_test_volume_load_header(lsv_info);
        }

#if !ALLOCATE_TEST
        int i;
        char tmp[512] = {0};
        for (i = 0; i < CHUNK_SIZE / sizeof(row_chunk_tlb_t); i++)
                test_vol->tlbs[i].chunk_id = i + 100;

        ret = lsv_bitmap_write_data(lsv_info, i, 0, 512, tmp);
#endif

        return ret;
}

int row_test_volume_write(volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf)
{
        int ret = 0;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        row_test_volume_t *test_vol = lsv_info->test_volume_context;
        row_io_t *ios = (row_io_t *)xmalloc(sizeof(row_io_t) * (io->size / CHUNK_SIZE + 2));
        uint32_t segs;

        void * _buffer = xmalloc(io->size);

        mbuffer_get(buf, _buffer, io->size);

        row_lba_split(io->offset, io->size, _buffer, ios, &segs);

        for(int i=0;i<segs;i++) {
                row_chunk_tlb_t *tlb = &test_vol->tlbs[ios[i].offset / CHUNK_SIZE];
                if(!tlb->chunk_id) {
                        uint32_t new_chunk_id;
                        ret = lsv_bitmap_alloc_chunk(lsv_info, &new_chunk_id, 1);
                        if(ret)
                                 goto fre;

                        if(tlb->chunk_id) //other IOs have already done it.
                        {
                                DINFO("double allocated, free new chunk id %d.\r\n", new_chunk_id);
                                lsv_bitmap_free_chunk(lsv_info, new_chunk_id);
                        }
                        else
                                tlb->chunk_id = new_chunk_id;

                        ret = row_test_volume_save_header(lsv_info);
                        if(ret)
                                 goto fre;
                }

                DINFO("write %ld, %d\r\n", ios[i].offset, ios[i].length);
                ret = lsv_bitmap_write_data(lsv_info, tlb->chunk_id, ios[i].offset % CHUNK_SIZE, ios[i].length, ios[i].buffer);
                if(ret)
                        goto fre;
        }

#if ALLOCATE_TEST
        static int step = 0;
        if(step ++ == 1000) {
                DINFO("begin dump:\r\n");
                for(int i=0;i<1024;i++)
                        if(test_vol->tlbs[i].chunk_id)
                                DINFO("%02d: %d\r\n", i, test_vol->tlbs[i].chunk_id);
                DINFO("end dump\r\n");
                step = 0;
        }
#endif

fre:
        xfree(_buffer);
        xfree(ios);
        
        return ret;
}

int row_test_volume_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret = 0;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        row_test_volume_t *test_vol = lsv_info->test_volume_context;
        row_io_t *ios = (row_io_t *)xmalloc(sizeof(row_io_t) * (io->size / CHUNK_SIZE + 2));
        uint32_t segs;

        void * _buffer = xmalloc(io->size);

        row_lba_split(io->offset, io->size, _buffer, ios, &segs);

        for(int i=0;i<segs;i++) {
                row_chunk_tlb_t *tlb = &test_vol->tlbs[ios[i].offset / CHUNK_SIZE];
                if(!tlb->chunk_id) {
                        memset(ios[i].buffer, 0, ios[i].length);
                }

                DINFO("read %ld, %d\r\n", ios[i].offset, ios[i].length);
                ret = lsv_bitmap_read_chunk(lsv_info, 0, tlb->chunk_id, ios[i].offset % CHUNK_SIZE, ios[i].length, ios[i].buffer);
                if(ret)
                        goto fre;
        }

#if 1
        mbuffer_copy(buf, _buffer, io->size);
        xfree(_buffer);
#else
        mbuffer_attach(buf, _buffer, io->size, _buffer);
#endif

        YASSERT(buf->len == io->size);

fre:
        if(ret)
                xfree(_buffer);

        xfree(ios);
        //xfree(_buffer);
        
        return ret;
}

int row_test_volume_write_align(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buffer)
{
        int ret = 0;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        row_io_t *ios = (row_io_t *)xmalloc(sizeof(row_io_t) * (length / CHUNK_SIZE + 2));
        uint32_t segs;

        row_lba_split(offset, length, buffer, ios, &segs);

        for(int i=0;i<segs;i++) {
                ret = lsv_bitmap_write_data(lsv_info, ios[i].offset / CHUNK_SIZE, ios[i].offset % CHUNK_SIZE, ios[i].length, ios[i].buffer);
                if(ret)
                        goto fre;
        }

fre:
        xfree(ios);
        
        return ret;
}

int row_test_volume_read_align(volume_proto_t *volume_proto, uint64_t offset, uint32_t length, uint8_t *buffer)
{
        int ret = 0;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;
        row_io_t *ios = (row_io_t *)xmalloc(sizeof(row_io_t) * (length / CHUNK_SIZE + 2));
        uint32_t segs;

        row_lba_split(offset, length, buffer, ios, &segs);

        for(int i=0;i<segs;i++) {
                ret = lsv_bitmap_read_chunk(lsv_info, 0, ios[i].offset / CHUNK_SIZE, ios[i].offset % CHUNK_SIZE, ios[i].length, ios[i].buffer);
                if(ret)
                        goto fre;
        }

fre:
        xfree(ios);
        
        return ret;
}